/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.end2end.index;

import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.*;

import java.sql.*;
import java.sql.Connection;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.*;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
@Category(NeedsOwnMiniClusterTest.class)
public class MutableIndexExtendedIT extends ParallelStatsDisabledIT {

  private static final Logger LOG = LoggerFactory.getLogger(MutableIndexExtendedIT.class);
  protected final boolean localIndex;
  protected final String tableDDLOptions;

  public MutableIndexExtendedIT(Boolean localIndex, String txProvider, Boolean columnEncoded) {
    this.localIndex = localIndex;
    StringBuilder optionBuilder = new StringBuilder();
    if (txProvider != null) {
      optionBuilder.append("TRANSACTIONAL=true," + PhoenixDatabaseMetaData.TRANSACTION_PROVIDER
        + "='" + txProvider + "'");
    }
    if (!columnEncoded) {
      if (optionBuilder.length() != 0) optionBuilder.append(",");
      optionBuilder.append("COLUMN_ENCODED_BYTES=0");
    }
    this.tableDDLOptions = optionBuilder.toString();
  }

  private static Connection getConnection(Properties props) throws SQLException {
    props.setProperty(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(1));
    Connection conn = DriverManager.getConnection(getUrl(), props);
    return conn;
  }

  protected static Connection getConnection() throws SQLException {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    return getConnection(props);
  }

  // name is used by failsafe as file name in reports
  @Parameterized.Parameters(
      name = "MutableIndexExtendedIT_localIndex={0},transactionProvider={1},columnEncoded={2}")
  public static Collection<Object[]> data() {
    return Arrays.asList(new Object[][] { { false, null, false }, { false, null, true },
      // OMID does not support local indexes or column encoding
      { false, "OMID", false }, { true, null, false }, { true, null, true }, });
  }

  // some tables (e.g. indexes on views) have UngroupedAgg coproc loaded, but don't have a
  // corresponding row in syscat. This tests that compaction isn't blocked
  // TODO: Move to a different test class?
  @Test(timeout = 120000)
  public void testCompactNonPhoenixTable() throws Exception {
    if (localIndex || tableDDLOptions.contains("TRANSACTIONAL=true")) return;

    try (Connection conn = getConnection()) {
      // create a vanilla HBase table (non-Phoenix)
      String randomTable = generateUniqueName();
      TableName hbaseTN = TableName.valueOf(randomTable);
      byte[] famBytes = Bytes.toBytes("fam");
      Table hTable = getUtility().createTable(hbaseTN, famBytes);
      TestUtil.addCoprocessor(conn, randomTable, UngroupedAggregateRegionObserver.class);
      Put put = new Put(Bytes.toBytes("row"));
      byte[] value = new byte[1];
      Bytes.random(value);
      put.addColumn(famBytes, Bytes.toBytes("colQ"), value);
      hTable.put(put);

      // major compaction shouldn't cause a timeout or RS abort
      List<HRegion> regions = getUtility().getHBaseCluster().getRegions(hbaseTN);
      HRegion hRegion = regions.get(0);
      hRegion.flush(true);
      HStore store = hRegion.getStore(famBytes);
      // Trigger major compaction
      store.triggerMajorCompaction();
      Optional<CompactionContext> requestCompaction =
        store.requestCompaction(org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER,
          CompactionLifeCycleTracker.DUMMY, null);
      store.compact(requestCompaction.get(), NoLimitThroughputController.INSTANCE, null);
      assertEquals(1, store.getStorefiles().size());

      // we should be able to compact syscat itself as well
      regions = getUtility().getHBaseCluster()
        .getRegions(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
      hRegion = regions.get(0);
      hRegion.flush(true);
      store = hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
      // Trigger major compaction
      store.triggerMajorCompaction();
      requestCompaction =
        store.requestCompaction(org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER,
          CompactionLifeCycleTracker.DUMMY, null);
      store.compact(requestCompaction.get(), NoLimitThroughputController.INSTANCE, null);
      assertEquals(1, store.getStorefiles().size());
    }
  }

  @Test
  @Ignore
  public void testIndexHalfStoreFileReader() throws Exception {
    if (!localIndex) return;

    Connection conn1 = getConnection();
    ConnectionQueryServices connectionQueryServices =
      driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES);
    Admin admin = connectionQueryServices.getAdmin();
    String tableName = "TBL_" + generateUniqueName();
    String indexName = "IDX_" + generateUniqueName();
    createBaseTable(conn1, tableName, "('e')");
    conn1.createStatement().execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName
      + " ON " + tableName + "(v1)" + (localIndex ? "" : " SPLIT ON ('e')"));
    conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z')");
    conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'z')");
    conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a')");
    conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c')");
    conn1.commit();

    String query = "SELECT count(*) FROM " + tableName + " where v1<='z'";
    ResultSet rs = conn1.createStatement().executeQuery(query);
    assertTrue(rs.next());
    assertEquals(4, rs.getInt(1));

    TableName indexTable = TableName.valueOf(localIndex ? tableName : indexName);
    admin.flush(indexTable);
    boolean merged = false;
    // merge regions until 1 left
    long numRegions = 0;
    while (true) {
      rs = conn1.createStatement().executeQuery(query);
      assertTrue(rs.next());
      assertEquals(4, rs.getInt(1)); // TODO this returns 5 sometimes instead of 4, duplicate
                                     // results?
      try {
        List<RegionInfo> indexRegions = admin.getRegions(indexTable);
        numRegions = indexRegions.size();
        if (numRegions == 1) {
          break;
        }
        if (!merged) {
          List<RegionInfo> regions = admin.getRegions(indexTable);
          LOG.info("Merging: " + regions.size());
          admin.mergeRegionsAsync(regions.get(0).getEncodedNameAsBytes(),
            regions.get(1).getEncodedNameAsBytes(), false);
          merged = true;
          Threads.sleep(10000);
        }
      } catch (Exception ex) {
        LOG.info("error:", ex);
      }
      long waitStartTime = System.currentTimeMillis();
      // wait until merge happened
      while (System.currentTimeMillis() - waitStartTime < 10000) {
        List<RegionInfo> regions = admin.getRegions(indexTable);
        LOG.info("Waiting:" + regions.size());
        if (regions.size() < numRegions) {
          break;
        }
        Threads.sleep(1000);
      }
      SnapshotTestingUtils.waitForTableToBeOnline(BaseTest.getUtility(), indexTable);
      assertTrue("Index table should be online ", admin.isTableAvailable(indexTable));
    }
  }

  protected void createBaseTable(Connection conn, String tableName, String splits)
    throws SQLException {
    String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n"
      + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
      + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
      + (tableDDLOptions != null ? tableDDLOptions : "")
      + (splits != null ? (" split on " + splits) : "");
    conn.createStatement().execute(ddl);
  }
}
